-
Notifications
You must be signed in to change notification settings - Fork 336
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
to #103, Support PB format serializer in Kafka Source #185
base: master
Are you sure you want to change the base?
to #103, Support PB format serializer in Kafka Source #185
Conversation
hi @qidian99 , thanks for your contribution, looks like there is a conflict. Would you rebase the latest master? |
2cb8936
to
40983a3
Compare
Done rebasing. @garyli1019 |
40983a3
to
bc60582
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your contribution! left some comments
@@ -0,0 +1,970 @@ | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add the license header?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class is generated by protoc, which isn't dynamic. I removed it and made the test case directly read proto descriptor for job configuration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's ok to put this class in our codebase, because if we generate this class from the test case, this class will be recognized by git and added to the git commit for those who run the test case. It's tricky to add this to .gitignore
as well. Add the license header on an auto generated file is fine I believe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nvm, my concern didn't happen at all.
@@ -64,6 +65,14 @@ public static KafkaDeserializationSchema<Row> getDeserializationSchema(BitSailCo | |||
.build()); | |||
} | |||
|
|||
if (StringUtils.equalsIgnoreCase(PB_DESERIALIZATION_SCHEMA_KEY, formatType)) { | |||
try { | |||
return new PbKafkaDeserializationSchema(configuration); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we use the CountKafkaDeserializationSchemaWrapper
like json, this wrapper is useful to run our regression test for streaming mode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, PbKafkaDeserializationSchema is renamed to PbDeserializationSchema and extends Flink's deserialization schema
import java.util.List; | ||
|
||
@Internal | ||
public class PbKafkaDeserializationSchema implements KafkaDeserializationSchema<Row> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with the wrapper, this class could be more general which not only work for kafka. Can we move this to bitsail-components/bitsail-component-formats/bitsail-component-format-pb
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tried generalizing PbDeserializationSchema. Already created empty module named bitsail-component-format-pb
. However, there are some problems:
- the wrapper takes in org.apache.flink.api.common.serialization.DeserializationSchema while deserialization schemas in bitsail-component-formats implements com.bytedance.bitsail.base.format.DeserializationSchema<I, O>
- v1 connectors are not using spi to get deserialization schemas, e.g., RocketMQDeserializationSchema
IMHO, It is better to create another issue for bitsail-component-formats implementations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, I agree with you here! Please feel free to create an issue if you're interested with the follow up.
} | ||
|
||
@Override | ||
public Row deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we can UT for this, I believe all the supported types should be covered
}, 0, 1, TimeUnit.SECONDS); | ||
} | ||
|
||
@SneakyThrows |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure why we need this SneakyThrows
here, would you elaborate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed. Thanks for pointing out!
@@ -0,0 +1,7 @@ | |||
syntax = "proto3"; | |||
|
|||
message ProtoTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest we include all supported types here, including some complex type.
"update-mode": "append" | ||
}, | ||
"proto": { | ||
"descriptor": "", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This json sample should be runnable. It's ok to add the pb info from the integration test you write.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added base64 encoded descriptor and class name
} | ||
}, 0, 1, TimeUnit.SECONDS); | ||
} | ||
|
||
@Test | ||
public void testKafkaSource() throws Exception { | ||
public void testKafkaSourceJsonFormat() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ran this ITCase and one test was failed. looks like the kafka producer didn't stop when we switch to the second test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
87e5e04
to
a46ffa4
Compare
a46ffa4
to
00fca9e
Compare
👏 Great work! |
Signed-off-by:
to #103, Support PB format serializer in Kafka Source